七的博客

Redisson2.0源码分析5-单机以及主从连接管理

源码分析

Redisson2.0源码分析5-单机以及主从连接管理

单节点模式跟主从节点模式有类似的地方,所以放一起进行分析。

1. 单机模式

1.1 单机模式连接配置以及初始化 SingleConnectionManager

配置单节点 Redis 服务器连接信息。

package org.redisson.connection;

import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.SingleServerConfig;

public class SingleConnectionManager extends MasterSlaveConnectionManager {

    // 负责将单节点配置转换成 MasterSlaveServersConfig  形式
    public SingleConnectionManager(SingleServerConfig cfg, Config config) {
        MasterSlaveServersConfig newconfig = new MasterSlaveServersConfig();
        // 节点地址
        String addr = cfg.getAddress().getHost() + ":" + cfg.getAddress().getPort();
        // 重试次数
        newconfig.setRetryAttempts(cfg.getRetryAttempts());
        newconfig.setRetryInterval(cfg.getRetryInterval());  // 重试间隔时间
        newconfig.setTimeout(cfg.getTimeout());  // 连接超时时间
        newconfig.setPingTimeout(cfg.getPingTimeout());  // Ping超时时间
        newconfig.setPassword(cfg.getPassword());  // 连接密码
        newconfig.setDatabase(cfg.getDatabase());  // 数据的选择
        newconfig.setClientName(cfg.getClientName());
        newconfig.setMasterAddress(addr);  // 主节点地址
        newconfig.setMasterConnectionPoolSize(cfg.getConnectionPoolSize());  // 连接池大小
        newconfig.setSubscriptionsPerConnection(cfg.getSubscriptionsPerConnection());  // 每个连接的订阅数
        newconfig.setSlaveSubscriptionConnectionPoolSize(cfg.getSubscriptionConnectionPoolSize());

        init(newconfig, config);
    }

    @Override
    protected void initEntry(MasterSlaveServersConfig config) {
        SingleEntry entry = new SingleEntry(0, MAX_SLOT, this, config);
        entries.put(MAX_SLOT, entry);
    }

}

1.2 单机模式连接管理 SingleEntry

这个类继承了 MasterSlaveEntry,主要管理跟单节点 Redis 服务器的连接。

主要作用:

  • 连接的创建。
  • 连接池的维护。
  • 管理发布订阅连接。
package org.redisson.connection;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;

public class SingleEntry extends MasterSlaveEntry {

    public SingleEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
        super(startSlot, endSlot, connectionManager, config);
    }

    @Override
    public void setupMasterEntry(String host, int port) {
        // 设置一些连接信息
        RedisClient masterClient = connectionManager.createClient(host, port);
        masterEntry = new SubscribesConnectionEntry(masterClient, config.getMasterConnectionPoolSize(), config.getSlaveSubscriptionConnectionPoolSize());
    }

	
    // 获取一个发布订阅连接
    private void acquireSubscribeConnection() {
        // 获取订阅连接的信号量,如果拿不到就会一直等待
        if (!((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().tryAcquire()) {
            log.warn("Subscribe connection pool gets exhausted! Trying to acquire connection ...");
            long time = System.currentTimeMillis();
            ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().acquireUninterruptibly();
            long endTime = System.currentTimeMillis() - time;
            log.warn("Subscribe connection acquired, time spended: {} ms", endTime);
        }
    }

     // 获取下一个可用的发布订阅连接
    @Override
    RedisPubSubConnection nextPubSubConnection() {
        acquireSubscribeConnection();

        RedisPubSubConnection conn = ((SubscribesConnectionEntry)masterEntry).pollFreeSubscribeConnection();
         // 从空闲订阅连接池中获取连接
        if (conn != null) {
            return conn;  // 如果获取成功,返回连接
        }

        try {
            // 创建一个新的连接
            conn = masterEntry.getClient().connectPubSub();
            // 做一些初始化操作。依次执行 认证 > 切换库  > 设置客户端名称
            if (config.getPassword() != null) {
                conn.sync(RedisCommands.AUTH, config.getPassword());
            }
            if (config.getDatabase() != 0) {
                conn.sync(RedisCommands.SELECT, config.getDatabase());
            }
            if (config.getClientName() != null) {
                conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
            }

            return conn;
        } catch (RedisConnectionException e) {
            ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
            throw e;
        }
    }

    @Override
    public void returnSubscribeConnection(PubSubConnectionEntry entry) {
        // 归还订阅连接
        ((SubscribesConnectionEntry)masterEntry).offerFreeSubscribeConnection(entry.getConnection());
        // 将连接放回空闲连接池
        ((SubscribesConnectionEntry)masterEntry).getSubscribeConnectionsSemaphore().release();
    }

    @Override
    public RedisConnection connectionReadOp() {
        return super.connectionWriteOp();
    }

    @Override
    public void releaseRead(RedisConnection сonnection) {
        super.releaseWrite(сonnection);
    }
}

2. 主从模式

2.1 主从模式连接配置以及初始化 MasterSlaveConnectionManager

这个主从模式的连接管理类开始变得更加复杂起来,主要作用有:

  • 创建、关闭跟 Redis 服务的连接。
  • Redis 数据槽的计算以及实例路由。
  • 发布订阅管理。
  • 主从切换以及故障恢复。
  • 连接池管理。

这里有个 Redis 知识点需要注意下。 Redis 的主从是不会自动进行节点切换的,也就是主节点挂掉之后就是挂了,从节点不会自动选举出主节点来。 所以这个类里面才会有这么多逻辑,这个是相当于客户端进行主从的故障切换,实现了一部分 Redis 哨兵模式的功能。

package org.redisson.connection;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

import org.redisson.Config;
import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisEmptySlotException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;


public class MasterSlaveConnectionManager implements ConnectionManager {

    static final int MAX_SLOT = 16384;  // Redis集群的最大槽位数
    private final Logger log = LoggerFactory.getLogger(getClass());
    private HashedWheelTimer timer;  // Netty 的时间轮工具
    protected Codec codec;  // 编解码器
    protected EventLoopGroup group; // Netty 的 EventLoopGroup


    protected Class<? extends SocketChannel> socketChannelClass;

    protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();

    protected MasterSlaveServersConfig config;

    protected final NavigableMap<Integer, MasterSlaveEntry> entries = new ConcurrentSkipListMap<Integer, MasterSlaveEntry>();   // 主从服务器配置

    // 控制关闭过程的信号量
    private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();

    //  Redis客户端集合
    private final Set<RedisClientEntry> clients = Collections.newSetFromMap(new ConcurrentHashMap<RedisClientEntry, Boolean>());

    MasterSlaveConnectionManager() {
    }

    @Override
    public HashedWheelTimer getTimer() {
        return timer;
    }

    @Override
    public MasterSlaveServersConfig getConfig() {
        return config;
    }

    @Override
    public Codec getCodec() {
        return codec;
    }

    @Override
    public NavigableMap<Integer, MasterSlaveEntry> getEntries() {
        return entries;
    }

    public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config) {
        init(cfg, config);
    }

    protected void init(MasterSlaveServersConfig config, Config cfg) {
        init(cfg);
        init(config);
    }

    protected void init(MasterSlaveServersConfig config) {
        this.config = config;
        // 计算最小超时时间,确保定时器的精度
        int minTimeout = Math.min(config.getRetryInterval(), config.getTimeout());
        if (minTimeout % 100 != 0) {
            // 超时时间不是100的倍数就设置一个更精细的定时器
            timer = new HashedWheelTimer((minTimeout % 100) / 2, TimeUnit.MILLISECONDS);
        } else {
            // 使用使用100毫秒的精度
            timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS);
        }

        initEntry(config);
    }

    protected void initEntry(MasterSlaveServersConfig config) {
        MasterSlaveEntry entry = new MasterSlaveEntry(0, MAX_SLOT, this, config);
        entries.put(MAX_SLOT, entry);
    }

    protected void init(Config cfg) {
        // 是 Linux 原生的 Epoll,就初始化 Epoll 事件循环组和通道类
        if (cfg.isUseLinuxNativeEpoll()) {
            // linux 平台上优先使用 Epoll 机制
            this.group = new EpollEventLoopGroup(cfg.getThreads());
            this.socketChannelClass = EpollSocketChannel.class;
        } else {
            // 使用通用的 NioSocketChannel
            this.group = new NioEventLoopGroup(cfg.getThreads());
            this.socketChannelClass = NioSocketChannel.class;
        }
        this.codec = cfg.getCodec();
    }

    // 创建新的Redis客户端
    @Override
    public RedisClient createClient(String host, int port) {
        RedisClient client = createClient(host, port, config.getTimeout());
        clients.add(new RedisClientEntry(client));
        return client;
    }

    // 异步关闭
    public void shutdownAsync(RedisClient client) {
        clients.remove(new RedisClientEntry(client));
        client.shutdownAsync();
    }

    @Override
    public RedisClient createClient(String host, int port, int timeout) {
        return new RedisClient(group, socketChannelClass, host, port, timeout);
    }

    @Override
    public <T> FutureListener<T> createReleaseWriteListener(final int slot,
                                    final RedisConnection conn, final Timeout timeout) {
        return new FutureListener<T>() {
            @Override
            public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
                // 释放关闭信号量
                shutdownLatch.release();
                // 取消超时任务
                timeout.cancel();
                // 释放写连接
                releaseWrite(slot, conn);
            }
        };
    }

    @Override
    public <T> FutureListener<T> createReleaseReadListener(final int slot,
                                    final RedisConnection conn, final Timeout timeout) {
        return new FutureListener<T>() {
            @Override
            public void operationComplete(io.netty.util.concurrent.Future<T> future) throws Exception {
                shutdownLatch.release();
                timeout.cancel();
                releaseRead(slot, conn);
            }
        };
    }

    // 计算 Redis 集群中键的槽位,这里需要了解到几个集群的知识点
    // Redis 集群使用槽位来分布数据,一共有 16384 个槽位
    // 每个键根据其槽位被分配到不同的节点上,实现数据的均匀存储。
    @Override
    public int calcSlot(String key) {
        // 如果只有一个条目或键为空,返回槽位 0
        if (entries.size() == 1 || key == null) {
            return 0;
        }

        // 如果 key 包含 {},对大括号内的这部分计算哈希。
        int start = key.indexOf('{');
        if (start != -1) {
            int end = key.indexOf('}');
            // 取出 {} 中的内容去计算哈希值
            key = key.substring(start+1, end);
        }

        // CRC16 算法计算键的哈希值,这是 Redis 用于分配槽位的标准方法
        // 对 MAX_SLOT 取模,将哈希值映射到可用的槽位范围
        int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
        log.debug("slot {} for {}", result, key);
        
        // 返回槽位
        return result;
    }

    @Override
    public PubSubConnectionEntry getEntry(String channelName) {
        return name2PubSubConnection.get(channelName);
    }

    // 订阅某个 channel
    @Override
    public PubSubConnectionEntry subscribe(String channelName) {
        // m允许每个Pub/Sub连接有多个 channel 名称
        PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
        if (сonnEntry != null) {
            // 如果已经订阅过了,直接返回已有的连接即可。
            return сonnEntry;
        }

        Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
        for (PubSubConnectionEntry entry : entries) {
            // 尝试获取连接条目
            if (entry.tryAcquire()) {
                PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
                
                // 如果这个 channel 已经被其他连接占用,直接返回旧的条目
                if (oldEntry != null) {
                    entry.release();
                    return oldEntry;
                }
				
                // 加锁防止被修改
                synchronized (entry) {
                    // 条目不活跃状态处理
                    if (!entry.isActive()) {
                        entry.release();
                        // 条目不活跃,递归调用重新订阅
                        return subscribe(channelName);
                    }
                    // 调用条目的订阅方法
                    entry.subscribe(codec, channelName);
                    return entry;
                }
            }
        }

        // 默认槽位为0
        int slot = 0;
        // 获取下一个Pub/Sub连接
        RedisPubSubConnection conn = nextPubSubConnection(slot);

        PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
         // 尝试获取连接
        entry.tryAcquire();
        PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
        if (oldEntry != null) {
            // 如果已存在旧的条目,就释放当前连接
            releaseSubscribeConnection(slot, entry);
            return oldEntry;
        }

        synchronized (entry) {
            if (!entry.isActive()) {
                // 如果条目不活跃,就递归调用重新订阅
                entry.release();
                return subscribe(channelName);
            }
            entry.subscribe(codec, channelName);
            return entry;
        }
    }
	
    // 模式匹配的发布订阅,套路跟上面差不多
    @Override
    public PubSubConnectionEntry psubscribe(String channelName) {
        // 允许每个Pub/Sub连接有多个频道名称
        PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
        if (сonnEntry != null) {
            return сonnEntry;
        }

        Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
        for (PubSubConnectionEntry entry : entries) {
            if (entry.tryAcquire()) {
                PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
                if (oldEntry != null) {
                    entry.release();
                    return oldEntry;
                }

                synchronized (entry) {
                    if (!entry.isActive()) {
                        entry.release();
                        return psubscribe(channelName);
                    }
                    entry.psubscribe(codec, channelName);
                    return entry;
                }
            }
        }

        // 默认槽位为0
        int slot = 0;
        RedisPubSubConnection conn = nextPubSubConnection(slot);

        PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
        entry.tryAcquire();
        PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
        if (oldEntry != null) {
            releaseSubscribeConnection(slot, entry);
            return oldEntry;
        }

        synchronized (entry) {
            if (!entry.isActive()) {
                entry.release();
                return psubscribe(channelName);
            }
            entry.psubscribe(codec, channelName);
            return entry;
        }
    }

    // 下面几个方法套路都差不多,不再各个解释
    @Override
    public void subscribe(RedisPubSubListener listener, String channelName) {
        PubSubConnectionEntry сonnEntry = name2PubSubConnection.get(channelName);
        if (сonnEntry != null) {
            сonnEntry.subscribe(codec, listener, channelName);
            return;
        }

        Set<PubSubConnectionEntry> entries = new HashSet<PubSubConnectionEntry>(name2PubSubConnection.values());
        for (PubSubConnectionEntry entry : entries) {
            if (entry.tryAcquire()) {
                PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
                if (oldEntry != null) {
                    entry.release();
                    return;
                }
                synchronized (entry) {
                    if (!entry.isActive()) {
                        entry.release();
                        subscribe(listener, channelName);
                        return;
                    }
                    entry.subscribe(codec, listener, channelName);
                    return;
                }
            }
        }

        int slot = 0;
        RedisPubSubConnection conn = nextPubSubConnection(slot);

        PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
        entry.tryAcquire();
        PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
        if (oldEntry != null) {
            releaseSubscribeConnection(slot, entry);
            return;
        }
        synchronized (entry) {
            if (!entry.isActive()) {
                entry.release();
                subscribe(listener, channelName);
                return;
            }
            entry.subscribe(codec, listener, channelName);
            return;
        }
    }

    @Override
    public void unsubscribe(final String channelName) {
        final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
        if (entry == null) {
            return;
        }

        entry.unsubscribe(channelName, new BaseRedisPubSubListener() {
			
            // 配置状态变更回调
            @Override
            public boolean onStatus(PubSubType type, String channel) {
                // 如果收到了当前 channel 的取消订阅消息
                if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
                    synchronized (entry) {
                        // 尝试关闭连接条目
                        if (entry.tryClose()) {
                            // 归还到连接池
                            releaseSubscribeConnection(0, entry);
                        }
                    }
                    // true 表示成功处理了该状态
                    return true;
                }
                //  false 表示未处理该状态
                return false;
            }

        });
    }

    @Override
    public void punsubscribe(final String channelName) {
        final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
        if (entry == null) {
            return;
        }

        entry.punsubscribe(channelName, new BaseRedisPubSubListener() {
			// 配置取消模式匹配变更回调
            @Override
            public boolean onStatus(PubSubType type, String channel) {
                 // 如果收到了当前 channel 的取消订阅消息
                if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
                    synchronized (entry) {
                         // 尝试关闭连接条目
                        if (entry.tryClose()) {
                              // 归还到连接池
                            releaseSubscribeConnection(0, entry);
                        }
                    }
                    return true;   // true 表示成功处理了该状态
                }
                return false;    //  false 表示未处理该状态
            }

        });
    }

    protected MasterSlaveEntry getEntry(int slot) {
        return entries.ceilingEntry(slot).getValue();
    }	

    // 处理从节点下线的逻辑
    protected void slaveDown(int slot, String host, int port) {
        Collection<RedisPubSubConnection> allPubSubConnections = getEntry(slot).slaveDown(host, port);

        // 重新将监听器附加到其他 channel
        for (Entry<String, PubSubConnectionEntry> mapEntry : name2PubSubConnection.entrySet()) {
            for (RedisPubSubConnection redisPubSubConnection : allPubSubConnections) {
                PubSubConnectionEntry entry = mapEntry.getValue();
                String channelName = mapEntry.getKey();
				
                // 如果当前的 PubSubConnectionEntry 的连接与下线的 RedisPubSubConnection 不匹配,跳过
                if (!entry.getConnection().equals(redisPubSubConnection)) {
                    continue;
                }

                synchronized (entry) {
                    entry.close();  // 关闭当前的 PubSubConnectionEntry

                    Collection<RedisPubSubListener> listeners = entry.getListeners(channelName);
                    unsubscribe(channelName);  // 取消当前频道的订阅
                    if (!listeners.isEmpty()) {
                        // 重新订阅频道,并将所有监听器重新绑定到新的连接上
                        PubSubConnectionEntry newEntry = subscribe(channelName);
                        for (RedisPubSubListener redisPubSubListener : listeners) {
                            newEntry.addListener(channelName, redisPubSubListener);
                        }
                        log.debug("resubscribed listeners for '{}' channel", channelName);
                    }
                }
            }
        }
    }

    protected void changeMaster(int endSlot, String host, int port) {
         // 更改指定槽位的主节点
        getEntry(endSlot).changeMaster(host, port);
    }

    protected MasterSlaveEntry removeMaster(int endSlot) {
        // 移除指定槽位的主从条目
        return entries.remove(endSlot);
    }

    // 写操作的连接
    @Override
    public RedisConnection connectionWriteOp(int slot) {
        MasterSlaveEntry e = getEntry(slot);
        if (!e.isOwn(slot)) {
            // 如果槽位没有对应的节点,抛出异常
            throw new RedisEmptySlotException("No node for slot: " + slot, slot);
        }
        return e.connectionWriteOp();
    }

    // 读操作的连接
    @Override
    public RedisConnection connectionReadOp(int slot) {
        MasterSlaveEntry e = getEntry(slot);
        if (!e.isOwn(slot)) {
             // 如果槽位没有对应的节点,抛出异常
            throw new RedisEmptySlotException("No node for slot: " + slot, slot);
        }
        return e.connectionReadOp();
    }

    RedisPubSubConnection nextPubSubConnection(int slot) {
        return getEntry(slot).nextPubSubConnection();
    }

    protected void releaseSubscribeConnection(int slot, PubSubConnectionEntry entry) {
        this.getEntry(slot).returnSubscribeConnection(entry);
    }

    @Override
    public void releaseWrite(int slot, RedisConnection connection) {
        getEntry(slot).releaseWrite(connection);
    }

    @Override
    public void releaseRead(int slot, RedisConnection connection) {
        getEntry(slot).releaseRead(connection);
    }

    @Override
    public void shutdown() {
        shutdownLatch.closeAndAwaitUninterruptibly();
        for (MasterSlaveEntry entry : entries.values()) {
            entry.shutdown();
        }
        timer.stop();
        group.shutdownGracefully().syncUninterruptibly();
    }

    public Collection<RedisClientEntry> getClients() {
        return Collections.unmodifiableCollection(clients);
    }

    @Override
    public <R> Promise<R> newPromise() {
        return group.next().newPromise();
    }

    @Override
    public EventLoopGroup getGroup() {
        return group;
    }

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        return timer.newTimeout(task, delay, unit);
    }

    public InfinitySemaphoreLatch getShutdownLatch() {
        return shutdownLatch;
    }

}

2.2 主从模式连接管理 MasterSlaveEntry

用于管理Redis主从结构中的连接。 这个里面代码逻辑很复杂,要去读下主从相关的文档,更好理解。

主要是实现如下功能:

  • 动态地切换主节点。 跟 Redis 哨兵有点类似,可以主节点宕机时进行故障转移。
  • 负载均衡。

相当于是实现了简单版本的哨兵功能,但是还不是特别的完善。

package org.redisson.connection;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MasterSlaveEntry {

    final Logger log = LoggerFactory.getLogger(getClass());

    LoadBalancer slaveBalancer;  // 负载均衡器,用于管理从节点的连接
    volatile ConnectionEntry masterEntry;  // 主节点的连接条目

    final MasterSlaveServersConfig config;  // 主从配置
    final ConnectionManager connectionManager; 

    final int startSlot;  //  起始槽位,用于标识Redis集群的槽位范围
    final int endSlot; //  结束槽位

    public MasterSlaveEntry(int startSlot, int endSlot, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
        this.startSlot = startSlot;
        this.endSlot = endSlot;
        this.connectionManager = connectionManager;
        this.config = config;

        slaveBalancer = config.getLoadBalancer();
        slaveBalancer.init(config, connectionManager);

        List<URI> addresses = new ArrayList<URI>(config.getSlaveAddresses());
        addresses.add(config.getMasterAddress());  // 添加主节点地址
        for (URI address : addresses) {
            // 为每个地址创建Redis客户端
            RedisClient client = connectionManager.createClient(address.getHost(), address.getPort());
            slaveBalancer.add(new SubscribesConnectionEntry(client,
                    this.config.getSlaveConnectionPoolSize(),
                    this.config.getSlaveSubscriptionConnectionPoolSize()));
        }

        // 如果有多个从节点,将主节点标记为下线
        if (config.getSlaveAddresses().size() > 1) {
            slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
        }

        setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
    }

    public void setupMasterEntry(String host, int port) {
        // 创建主节点的Redis客户端
        RedisClient client = connectionManager.createClient(host, port);
        masterEntry = new ConnectionEntry(client, config.getMasterConnectionPoolSize());
    }

    public Collection<RedisPubSubConnection> slaveDown(String host, int port) {
        Collection<RedisPubSubConnection> conns = slaveBalancer.freeze(host, port);
         // 冻结指定从节点的连接,并返回所有相关的Pub/Sub连接
        if (slaveBalancer.getAvailableClients() == 0) {
            // 如果没有可用的从节点,将主节点标记为上线
            slaveUp(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
        }
        return conns;
    }
	
    // 添加从节点
    // 默认情况下会冻结,到需要的时候才会解冻
    public void addSlave(String host, int port) {
        // 创建新的从节点客户端
        RedisClient client = connectionManager.createClient(host, port);
        SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client,
                this.config.getSlaveConnectionPoolSize(),
                this.config.getSlaveSubscriptionConnectionPoolSize());
        // 初始化从节点的连接条目
        // 将从节点标记为冻结状态
        entry.setFreezed(true);
        // 将从节点添加到负载均衡中
        slaveBalancer.add(entry);
    }

     // 返回主节点的 Redis 客户端
    public RedisClient getClient() {
        return masterEntry.getClient();
    }

    // 解冻并且启用某个从节点
    public void slaveUp(String host, int port) {
         // 如果指定的节点不是当前主节点,将当前主节点标记为下线
        if (!masterEntry.getClient().getAddr().getHostName().equals(host) && port != masterEntry.getClient().getAddr().getPort()) {
            slaveDown(masterEntry.getClient().getAddr().getHostName(), masterEntry.getClient().getAddr().getPort());
        }
        // 解除指定从节点的冻结状态
        slaveBalancer.unfreeze(host, port);
    }

    // 更改主节点,将指定的从节点提升为主节点。
    // 冻结旧的主节点,并重新附加Pub/Sub监听器到其他从节点。
    public void changeMaster(String host, int port) {
        // 存旧的主节点连接条目
        ConnectionEntry oldMaster = masterEntry;
        setupMasterEntry(host, port);

        // 如果有多个可用的从节点,将新主节点标记为下线
        if (slaveBalancer.getAvailableClients() > 1) {
            slaveDown(host, port);
        }

        // 异步关闭旧的主节点客户端
        connectionManager.shutdownAsync(oldMaster.getClient());
    }
	
    
    public void shutdownMasterAsync() {
        // 异步关闭主节点客户端
        connectionManager.shutdownAsync(masterEntry.getClient());
        slaveBalancer.shutdownAsync();
    }

    // 获取用于写操作的主节点连接。如果连接池中没有可用连接,会尝试新建连接
    public RedisConnection connectionWriteOp() {
        // 获取主节点连接的信号量
        acquireMasterConnection();

        // 从连接池中获取一个连接
        RedisConnection conn = masterEntry.getConnections().poll();
        if (conn != null) {
            return conn;
        }

        // 如果获取成功,返回连接
        try {
            return masterEntry.connect(config);
        } catch (RedisException e) {
            masterEntry.getConnectionsSemaphore().release();
            throw e;
        }
    }
	
    // 获取用于读操作的从节点连接,由负载均衡器选择合适的从节点
    public RedisConnection connectionReadOp() {
        return slaveBalancer.nextConnection();
    }

    RedisPubSubConnection nextPubSubConnection() {
        return slaveBalancer.nextPubSubConnection();
    }

    void acquireMasterConnection() {
        if (!masterEntry.getConnectionsSemaphore().tryAcquire()) {
            log.warn("Master connection pool gets exhausted! Trying to acquire connection ...");
            long time = System.currentTimeMillis();

            // 阻塞直到获取信号量
            masterEntry.getConnectionsSemaphore().acquireUninterruptibly();
            long endTime = System.currentTimeMillis() - time;
            log.warn("Master connection acquired, time spended: {} ms", endTime);
        }
    }

    public void returnSubscribeConnection(PubSubConnectionEntry entry) {
        slaveBalancer.returnSubscribeConnection(entry.getConnection());
    }

    // 释放写操作连接
    public void releaseWrite(RedisConnection connection) {
        // may changed during changeMaster call
        if (!masterEntry.getClient().equals(connection.getRedisClient())) {
            connection.closeAsync();
            return;
        }

        // 将连接归还到连接池中
        masterEntry.getConnections().add(connection);
        // 释放信号量
        masterEntry.getConnectionsSemaphore().release();
    }
	
    
    // 释放读操作连接
    public void releaseRead(RedisConnection сonnection) {
        slaveBalancer.returnConnection(сonnection);
    }

    public void shutdown() {
        masterEntry.getClient().shutdown();
        slaveBalancer.shutdown();
    }

    public int getEndSlot() {
        return endSlot;
    }

    public int getStartSlot() {
        return startSlot;
    }

    public boolean isOwn(int slot) {
        return slot >= startSlot && slot <= endSlot;
    }

}