Redisson2.0源码分析5-单机以及主从连接管理
Redisson2.0源码分析5-单机以及主从连接管理
单节点模式跟主从节点模式有类似的地方,所以放一起进行分析。
1. 单机模式
1.1 单机模式连接配置以及初始化 SingleConnectionManager
配置单节点 Redis 服务器连接信息。
package org.redisson.connection;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.SingleServerConfig;
public class SingleConnectionManager extends MasterSlaveConnectionManager {
// 负责将单节点配置转换成 MasterSlaveServersConfig 形式
public SingleConnectionManager(SingleServerConfig cfg, Config config) {
MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig();
// 节点地址
String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort();
// 重试次数
newconfig.setRetryAttempts(cfg.getRetryAttempts());
newconfig.setRetryInterval(cfg.getRetryInterval()); // 重试间隔时间
newconfig.setTimeout(cfg.getTimeout()); // 连接超时时间
newconfig.setPingTimeout(cfg.getPingTimeout()); // Ping超时时间
newconfig.setPassword(cfg.getPassword()); // 连接密码
newconfig.setDatabase(cfg.getDatabase()); // 数据的选择
newconfig.setClientName(cfg.getClientName());
newconfig.setMasterAddress(addr); // 主节点地址
newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize()); // 连接池大小
newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection()); // 每个连接的订阅数
newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());
init(newconfig, config);
}
@Override
protected void initEntry(MasterSlaveServersConfig config) {
SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config);
entries.put(MAX_SLOT, entry);
}
}
1.2 单机模式连接管理 SingleEntry
这个类继承了 MasterSlaveEntry,主要管理跟单节点 Redis 服务器的连接。
主要作用:
- 连接的创建。
- 连接池的维护。
- 管理发布订阅连接。
package org.redisson.connection;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
public class SingleEntry extends MasterSlaveEntry {
public SingleEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(startSlot, endSlot, connectionManager, config);
}
@Override
public void setupMasterEntry(String host, int port) {
// 设置一些连接信息
RedisClient masterClient = connectionManager.createClient(host, port);
masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize());
}
// 获取一个发布订阅连接
private void acquireSubscribeConnection() {
// 获取订阅连接的信号量,如果拿不到就会一直等待
if (!((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().tryAcquire()) {
log.warn("Subscribe connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Subscribe connection acquired, time spended: {} ms", endTime);
}
}
// 获取下一个可用的发布订阅连接
@Override
RedisPubSubConnection nextPubSubConnection() {
acquireSubscribeConnection();
RedisPubSubConnection conn = ((SubscribesConnectionEntry)masterEntry).pollFreeSubscribeConnection();
// 从空闲订阅连接池中获取连接
if (conn != null) {
return conn; // 如果获取成功,返回连接
}
try {
// 创建一个新的连接
conn = masterEntry.getClient().connectPubSub();
// 做一些初始化操作。依次执行 认证 > 切换库 > 设置客户端名称
if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword());
}
if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase());
}
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
return conn;
} catch (RedisConnectionException e) {
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
throw e;
}
}
@Override
public void returnSubscribeConnection(PubSubConnectionEntry entry) {
// 归还订阅连接
((SubscribesConnectionEntry)masterEntry).offerFreeSubscribeConnection(entry.getConnection());
// 将连接放回空闲连接池
((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
}
@Override
public RedisConnection connectionReadOp() {
return super.connectionWriteOp();
}
@Override
public void releaseRead(RedisConnection сonnection) {
super.releaseWrite(сonnection);
}
}
2. 主从模式
2.1 主从模式连接配置以及初始化 MasterSlaveConnectionManager
这个主从模式的连接管理类开始变得更加复杂起来,主要作用有:
- 创建、关闭跟 Redis 服务的连接。
- Redis 数据槽的计算以及实例路由。
- 发布订阅管理。
- 主从切换以及故障恢复。
- 连接池管理。
这里有个 Redis 知识点需要注意下。 Redis 的主从是不会自动进行节点切换的,也就是主节点挂掉之后就是挂了,从节点不会自动选举出主节点来。 所以这个类里面才会有这么多逻辑,这个是相当于客户端进行主从的故障切换,实现了一部分 Redis 哨兵模式的功能。
package org.redisson.connection;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisEmptySlotException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
public class MasterSlaveConnectionManager implements ConnectionManager {
static final int MAX_SLOT = 16384; // Redis集群的最大槽位数
private final Logger log = LoggerFactory.getLogger(getClass());
private HashedWheelTimer timer; // Netty 的时间轮工具
protected Codec codec; // 编解码器
protected EventLoopGroup group; // Netty 的 EventLoopGroup
protected Class<? extends SocketChannel> socketChannelClass;
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
protected MasterSlaveServersConfig config;
protected final NavigableMap<Integer, MasterSlaveEntry> entries = new ConcurrentSkipListMap<Integer, MasterSlaveEntry>(); // 主从服务器配置
// 控制关闭过程的信号量
private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();
// Redis客户端集合
private final Set<RedisClientEntry> clients = Collections.newSetFromMap(new ConcurrentHashMap<RedisClientEntry, Boolean>());
MasterSlaveConnectionManager() {
}
@Override
public HashedWheelTimer getTimer() {
return timer;
}
@Override
public MasterSlaveServersConfig getConfig() {
return config;
}
@Override
public Codec getCodec() {
return codec;
}
@Override
public NavigableMap<Integer, MasterSlaveEntry> getEntries() {
return entries;
}
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
init(cfg, config);
}
protected void init(MasterSlaveServersConfig config, Config cfg) {
init(cfg);
init(config);
}
protected void init(MasterSlaveServersConfig config) {
this.config = config;
// 计算最小超时时间,确保定时器的精度
int minTimeout = Math.min(config.getRetryInterval(), config.getTimeout());
if (minTimeout % 100 != 0) {
// 超时时间不是100的倍数就设置一个更精细的定时器
timer = new HashedWheelTimer((minTimeout % 100) / 2, TimeUnit.MILLISECONDS);
} else {
// 使用使用100毫秒的精度
timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS);
}
initEntry(config);
}
protected void initEntry(MasterSlaveServersConfig config) {
MasterSlaveEntry entry = new MasterSlaveEntry(0, MAX_SLOT, this, config);
entries.put(MAX_SLOT, entry);
}
protected void init(Config cfg) {
// 是 Linux 原生的 Epoll,就初始化 Epoll 事件循环组和通道类
if (cfg.isUseLinuxNativeEpoll()) {
// linux 平台上优先使用 Epoll 机制
this.group = new EpollEventLoopGroup(cfg.getThreads());
this.socketChannelClass = EpollSocketChannel.class;
} else {
// 使用通用的 NioSocketChannel
this.group = new NioEventLoopGroup(cfg.getThreads());
this.socketChannelClass = NioSocketChannel.class;
}
this.codec = cfg.getCodec();
}
// 创建新的Redis客户端
@Override
public RedisClient createClient(String host, int port) {
RedisClient client = createClient(host, port, config.getTimeout());
clients.add(new RedisClientEntry(client));
return client;
}
// 异步关闭
public void shutdownAsync(RedisClient client) {
clients.remove(new RedisClientEntry(client));
client.shutdownAsync();
}
@Override
public RedisClient createClient(String host, int port, int timeout) {
return new RedisClient(group, socketChannelClass, host, port, timeout);
}
@Override
public <T> FutureListener<T> createReleaseWriteListener(final int slot,
final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
// 释放关闭信号量
shutdownLatch.release();
// 取消超时任务
timeout.cancel();
// 释放写连接
releaseWrite(slot, conn);
}
};
}
@Override
public <T> FutureListener<T> createReleaseReadListener(final int slot,
final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
shutdownLatch.release();
timeout.cancel();
releaseRead(slot, conn);
}
};
}
// 计算 Redis 集群中键的槽位,这里需要了解到几个集群的知识点
// Redis 集群使用槽位来分布数据,一共有 16384 个槽位
// 每个键根据其槽位被分配到不同的节点上,实现数据的均匀存储。
@Override
public int calcSlot(String key) {
// 如果只有一个条目或键为空,返回槽位 0
if (entries.size() == 1 || key == null) {
return 0;
}
// 如果 key 包含 {},对大括号内的这部分计算哈希。
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
// 取出 {} 中的内容去计算哈希值
key = key.substring(start+1, end);
}
// CRC16 算法计算键的哈希值,这是 Redis 用于分配槽位的标准方法
// 对 MAX_SLOT 取模,将哈希值映射到可用的槽位范围
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
log.debug("slot {} for {}", result, key);
// 返回槽位
return result;
}
@Override
public PubSubConnectionEntry getEntry(String channelName) {
return name2PubSubConnection.get(channelName);
}
// 订阅某个 channel
@Override
public PubSubConnectionEntry subscribe(String channelName) {
// m允许每个Pub/Sub连接有多个 channel 名称
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
// 如果已经订阅过了,直接返回已有的连接即可。
return сonnEntry;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) {
// 尝试获取连接条目
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
// 如果这个 channel 已经被其他连接占用,直接返回旧的条目
if (oldEntry != null) {
entry.release();
return oldEntry;
}
// 加锁防止被修改
synchronized (entry) {
// 条目不活跃状态处理
if (!entry.isActive()) {
entry.release();
// 条目不活跃,递归调用重新订阅
return subscribe(channelName);
}
// 调用条目的订阅方法
entry.subscribe(codec, channelName);
return entry;
}
}
}
// 默认槽位为0
int slot = 0;
// 获取下一个Pub/Sub连接
RedisPubSubConnection conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
// 尝试获取连接
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
// 如果已存在旧的条目,就释放当前连接
releaseSubscribeConnection(slot, entry);
return oldEntry;
}
synchronized (entry) {
if (!entry.isActive()) {
// 如果条目不活跃,就递归调用重新订阅
entry.release();
return subscribe(channelName);
}
entry.subscribe(codec, channelName);
return entry;
}
}
// 模式匹配的发布订阅,套路跟上面差不多
@Override
public PubSubConnectionEntry psubscribe(String channelName) {
// 允许每个Pub/Sub连接有多个频道名称
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
return сonnEntry;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return oldEntry;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName);
}
entry.psubscribe(codec, channelName);
return entry;
}
}
}
// 默认槽位为0
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return oldEntry;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
return psubscribe(channelName);
}
entry.psubscribe(codec, channelName);
return entry;
}
}
// 下面几个方法套路都差不多,不再各个解释
@Override
public void subscribe(RedisPubSubListener listener, String channelName) {
PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
if (сonnEntry != null) {
сonnEntry.subscribe(codec, listener, channelName);
return;
}
Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
for (PubSubConnectionEntry entry : entries) {
if (entry.tryAcquire()) {
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
entry.release();
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
entry.subscribe(codec, listener, channelName);
return;
}
}
}
int slot = 0;
RedisPubSubConnection conn = nextPubSubConnection(slot);
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
entry.subscribe(codec, listener, channelName);
return;
}
}
@Override
public void unsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return;
}
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
// 配置状态变更回调
@Override
public boolean onStatus(PubSubType type, String channel) {
// 如果收到了当前 channel 的取消订阅消息
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
// 尝试关闭连接条目
if (entry.tryClose()) {
// 归还到连接池
releaseSubscribeConnection(0, entry);
}
}
// true 表示成功处理了该状态
return true;
}
// false 表示未处理该状态
return false;
}
});
}
@Override
public void punsubscribe(final String channelName) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return;
}
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
// 配置取消模式匹配变更回调
@Override
public boolean onStatus(PubSubType type, String channel) {
// 如果收到了当前 channel 的取消订阅消息
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
synchronized (entry) {
// 尝试关闭连接条目
if (entry.tryClose()) {
// 归还到连接池
releaseSubscribeConnection(0, entry);
}
}
return true; // true 表示成功处理了该状态
}
return false; // false 表示未处理该状态
}
});
}
protected MasterSlaveEntry getEntry(int slot) {
return entries.ceilingEntry(slot).getValue();
}
// 处理从节点下线的逻辑
protected void slaveDown(int slot, String host, int port) {
Collection<RedisPubSubConnection> allPubSubConnections = getEntry(slot).slaveDown(host, port);
// 重新将监听器附加到其他 channel
for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) {
PubSubConnectionEntry entry = mapEntry.getValue();
String channelName = mapEntry.getKey();
// 如果当前的 PubSubConnectionEntry 的连接与下线的 RedisPubSubConnection 不匹配,跳过
if (!entry.getConnection().equals(redisPubSubConnection)) {
continue;
}
synchronized (entry) {
entry.close(); // 关闭当前的 PubSubConnectionEntry
Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
unsubscribe(channelName); // 取消当前频道的订阅
if (!listeners.isEmpty()) {
// 重新订阅频道,并将所有监听器重新绑定到新的连接上
PubSubConnectionEntry newEntry = subscribe(channelName);
for (RedisPubSubListener redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
}
}
}
}
}
protected void changeMaster(int endSlot, String host, int port) {
// 更改指定槽位的主节点
getEntry(endSlot).changeMaster(host, port);
}
protected MasterSlaveEntry removeMaster(int endSlot) {
// 移除指定槽位的主从条目
return entries.remove(endSlot);
}
// 写操作的连接
@Override
public RedisConnection connectionWriteOp(int slot) {
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
// 如果槽位没有对应的节点,抛出异常
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
}
return e.connectionWriteOp();
}
// 读操作的连接
@Override
public RedisConnection connectionReadOp(int slot) {
MasterSlaveEntry e = getEntry(slot);
if (!e.isOwn(slot)) {
// 如果槽位没有对应的节点,抛出异常
throw new RedisEmptySlotException("No node for slot: " + slot, slot);
}
return e.connectionReadOp();
}
RedisPubSubConnection nextPubSubConnection(int slot) {
return getEntry(slot).nextPubSubConnection();
}
protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry entry) {
this.getEntry(slot).returnSubscribeConnection(entry);
}
@Override
public void releaseWrite(int slot, RedisConnection connection) {
getEntry(slot).releaseWrite(connection);
}
@Override
public void releaseRead(int slot, RedisConnection connection) {
getEntry(slot).releaseRead(connection);
}
@Override
public void shutdown() {
shutdownLatch.closeAndAwaitUninterruptibly();
for (MasterSlaveEntry entry : entries.values()) {
entry.shutdown();
}
timer.stop();
group.shutdownGracefully().syncUninterruptibly();
}
public Collection<RedisClientEntry> getClients() {
return Collections.unmodifiableCollection(clients);
}
@Override
public <R> Promise<R> newPromise() {
return group.next().newPromise();
}
@Override
public EventLoopGroup getGroup() {
return group;
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
return timer.newTimeout(task, delay, unit);
}
public InfinitySemaphoreLatch getShutdownLatch() {
return shutdownLatch;
}
}
2.2 主从模式连接管理 MasterSlaveEntry
用于管理Redis主从结构中的连接。 这个里面代码逻辑很复杂,要去读下主从相关的文档,更好理解。
主要是实现如下功能:
- 动态地切换主节点。 跟 Redis 哨兵有点类似,可以主节点宕机时进行故障转移。
- 负载均衡。
相当于是实现了简单版本的哨兵功能,但是还不是特别的完善。
package org.redisson.connection;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MasterSlaveEntry {
final Logger log = LoggerFactory.getLogger(getClass());
LoadBalancer slaveBalancer; // 负载均衡器,用于管理从节点的连接
volatile ConnectionEntry masterEntry; // 主节点的连接条目
final MasterSlaveServersConfig config; // 主从配置
final ConnectionManager connectionManager;
final int startSlot; // 起始槽位,用于标识Redis集群的槽位范围
final int endSlot; // 结束槽位
public MasterSlaveEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
this.startSlot = startSlot;
this.endSlot = endSlot;
this.connectionManager = connectionManager;
this.config = config;
slaveBalancer = config.getLoadBalancer();
slaveBalancer.init(config, connectionManager);
List<URI> addresses = new ArrayList<URI>(config.getSlaveAddresses());
addresses.add(config.getMasterAddress()); // 添加主节点地址
for (URI address : addresses) {
// 为每个地址创建Redis客户端
RedisClient client = connectionManager.createClient(address.getHost(), address.getPort());
slaveBalancer.add(new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize()));
}
// 如果有多个从节点,将主节点标记为下线
if (config.getSlaveAddresses().size() > 1) {
slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
}
setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
}
public void setupMasterEntry(String host, int port) {
// 创建主节点的Redis客户端
RedisClient client = connectionManager.createClient(host, port);
masterEntry = new ConnectionEntry(client, config.getMasterConnectionPoolSize());
}
public Collection<RedisPubSubConnection> slaveDown(String host, int port) {
Collection<RedisPubSubConnection> conns = slaveBalancer.freeze(host, port);
// 冻结指定从节点的连接,并返回所有相关的Pub/Sub连接
if (slaveBalancer.getAvailableClients() == 0) {
// 如果没有可用的从节点,将主节点标记为上线
slaveUp(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
}
return conns;
}
// 添加从节点
// 默认情况下会冻结,到需要的时候才会解冻
public void addSlave(String host, int port) {
// 创建新的从节点客户端
RedisClient client = connectionManager.createClient(host, port);
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
this.config.getSlaveConnectionPoolSize(),
this.config.getSlaveSubscriptionConnectionPoolSize());
// 初始化从节点的连接条目
// 将从节点标记为冻结状态
entry.setFreezed(true);
// 将从节点添加到负载均衡中
slaveBalancer.add(entry);
}
// 返回主节点的 Redis 客户端
public RedisClient getClient() {
return masterEntry.getClient();
}
// 解冻并且启用某个从节点
public void slaveUp(String host, int port) {
// 如果指定的节点不是当前主节点,将当前主节点标记为下线
if (!masterEntry.getClient().getAddr().getHostName().equals(host) && port != masterEntry.getClient().getAddr().getPort()) {
slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
}
// 解除指定从节点的冻结状态
slaveBalancer.unfreeze(host, port);
}
// 更改主节点,将指定的从节点提升为主节点。
// 冻结旧的主节点,并重新附加Pub/Sub监听器到其他从节点。
public void changeMaster(String host, int port) {
// 存旧的主节点连接条目
ConnectionEntry oldMaster = masterEntry;
setupMasterEntry(host, port);
// 如果有多个可用的从节点,将新主节点标记为下线
if (slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port);
}
// 异步关闭旧的主节点客户端
connectionManager.shutdownAsync(oldMaster.getClient());
}
public void shutdownMasterAsync() {
// 异步关闭主节点客户端
connectionManager.shutdownAsync(masterEntry.getClient());
slaveBalancer.shutdownAsync();
}
// 获取用于写操作的主节点连接。如果连接池中没有可用连接,会尝试新建连接
public RedisConnection connectionWriteOp() {
// 获取主节点连接的信号量
acquireMasterConnection();
// 从连接池中获取一个连接
RedisConnection conn = masterEntry.getConnections().poll();
if (conn != null) {
return conn;
}
// 如果获取成功,返回连接
try {
return masterEntry.connect(config);
} catch (RedisException e) {
masterEntry.getConnectionsSemaphore().release();
throw e;
}
}
// 获取用于读操作的从节点连接,由负载均衡器选择合适的从节点
public RedisConnection connectionReadOp() {
return slaveBalancer.nextConnection();
}
RedisPubSubConnection nextPubSubConnection() {
return slaveBalancer.nextPubSubConnection();
}
void acquireMasterConnection() {
if (!masterEntry.getConnectionsSemaphore().tryAcquire()) {
log.warn("Master connection pool gets exhausted! Trying to acquire connection ...");
long time = System.currentTimeMillis();
// 阻塞直到获取信号量
masterEntry.getConnectionsSemaphore().acquireUninterruptibly();
long endTime = System.currentTimeMillis() - time;
log.warn("Master connection acquired, time spended: {} ms", endTime);
}
}
public void returnSubscribeConnection(PubSubConnectionEntry entry) {
slaveBalancer.returnSubscribeConnection(entry.getConnection());
}
// 释放写操作连接
public void releaseWrite(RedisConnection connection) {
// may changed during changeMaster call
if (!masterEntry.getClient().equals(connection.getRedisClient())) {
connection.closeAsync();
return;
}
// 将连接归还到连接池中
masterEntry.getConnections().add(connection);
// 释放信号量
masterEntry.getConnectionsSemaphore().release();
}
// 释放读操作连接
public void releaseRead(RedisConnection сonnection) {
slaveBalancer.returnConnection(сonnection);
}
public void shutdown() {
masterEntry.getClient().shutdown();
slaveBalancer.shutdown();
}
public int getEndSlot() {
return endSlot;
}
public int getStartSlot() {
return startSlot;
}
public boolean isOwn(int slot) {
return slot >= startSlot && slot <= endSlot;
}
}