七的博客

Redisson2.0源码分析3-配置以及负载均衡策略

源码分析

Redisson2.0源码分析3-配置以及负载均衡策略

1. 配置管理

1.1 基础配置 BaseConfig

所有 Redis 连接模式下通用的参数配置。

package org.redisson;


class BaseConfig<T extends BaseConfig<T>> {

    // 节点间 Ping 超时时间,默认值为1000毫秒
    private int pingTimeout = 1000;

    //  Redis操作执行的超时时间,默认值为60000毫秒(1分钟)
    private int timeout = 60000;

    private int retryAttempts = 20;  // 重试的次数

    private int retryInterval = 1000;  // 重试间隔

    private int database = 0;  // 默认数据库

    private String password;  // 密码

    // 每个Redis连接的订阅限制
    private int subscriptionsPerConnection = 5;

    //  客户端连接的名称
    private String clientName;

    BaseConfig() {
    }

    BaseConfig(T config) {
        setPassword(config.getPassword());
        setSubscriptionsPerConnection(config.getSubscriptionsPerConnection());
        setRetryAttempts(config.getRetryAttempts());
        setRetryInterval(config.getRetryInterval());
        setDatabase(config.getDatabase());
        setTimeout(config.getTimeout());
        setClientName(config.getClientName());
        setPingTimeout(config.getPingTimeout());
    }

    // 省略 set get

}


1.2 基础主从配置 BaseMasterSlaveServersConfig

所有主从配置的基类,实现类有好几个。 这个类中的配置是一些通用的主从配置信息。

package org.redisson;

import org.redisson.connection.LoadBalancer;
import org.redisson.connection.RoundRobinLoadBalancer;

public class BaseMasterSlaveServersConfig<T extends BaseMasterSlaveServersConfig<T>> extends BaseConfig<T> {

     // 负载均衡算法配置,默认轮训
    private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();

    // 每个从节点的 Redis 发布订阅连接池大小,默认值为25
    private int slaveSubscriptionConnectionPoolSize = 25;

    // 每个从节点的Redis连接池大小
    private int slaveConnectionPoolSize = 100;

    // 主节点的Redis连接池大小
    private int masterConnectionPoolSize = 100;

    public BaseMasterSlaveServersConfig() {
    }

    BaseMasterSlaveServersConfig(T config) {
        super(config);
        setLoadBalancer(config.getLoadBalancer());
        setMasterConnectionPoolSize(config.getMasterConnectionPoolSize());
        setSlaveConnectionPoolSize(config.getSlaveConnectionPoolSize());
        setSlaveSubscriptionConnectionPoolSize(config.getSlaveSubscriptionConnectionPoolSize());
    }

     // 省略 set get
    
}

1.3 主从配置 MasterSlaveServersConfig

Redis 主从模式连接的配置。

package org.redisson;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.redisson.misc.URIBuilder;

public class MasterSlaveServersConfig extends BaseMasterSlaveServersConfig<MasterSlaveServersConfig> {

    // 从节点地址集合
    private List<URI> slaveAddresses = new ArrayList<URI>();

    // 主节点地址
    private URI masterAddress;

    public MasterSlaveServersConfig() {
    }

    MasterSlaveServersConfig(MasterSlaveServersConfig config) {
        super(config);
        setLoadBalancer(config.getLoadBalancer());
        setMasterAddress(config.getMasterAddress());
        setSlaveAddresses(config.getSlaveAddresses());
    }

    // 省略 set get

}

1.4 哨兵配置 SentinelServersConfig

配置 Redis 哨兵模式下的连接。

package org.redisson;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import org.redisson.misc.URIBuilder;

public class SentinelServersConfig extends BaseMasterSlaveServersConfig<SentinelServersConfig> {

    // 哨兵地址集合
    private List<URI> sentinelAddresses = new ArrayList<URI>();
    
    // 主节点的名称,用于识别哨兵监控的主节点
    private String masterName;

    public SentinelServersConfig() {
    }

    SentinelServersConfig(SentinelServersConfig config) {
        super(config);
        setSentinelAddresses(config.getSentinelAddresses());
        setMasterName(config.getMasterName());
    }

    // 省略 set get

}

1.5 集群配置 ClusterServersConfig

Redis 集群模式下的连接配置。

package org.redisson;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import org.redisson.misc.URIBuilder;

public class ClusterServersConfig extends BaseMasterSlaveServersConfig<ClusterServersConfig> {

    // 集群节点列表
    private List<URI> nodeAddresses = new ArrayList<URI>();

    // Redis 集群扫描间隔时间
    private int scanInterval = 1000;

    public ClusterServersConfig() {
    }

    ClusterServersConfig(ClusterServersConfig config) {
        super(config);
        setNodeAddresses(config.getNodeAddresses());
        setScanInterval(config.getScanInterval());
    }

     // 省略 set get



}

1.6 单节点配置 SingleServerConfig

单节点 Redis 的连接配置,可以配置的参数不是特别的多。

package org.redisson;

import java.net.URI;

import org.redisson.misc.URIBuilder;

public class SingleServerConfig extends BaseConfig<SingleServerConfig> {

    // Redis 服务地址
    private URI address;

    // Redis订阅连接池的大小,默认值为25
    private int subscriptionConnectionPoolSize = 25;

    // 连接池大小
    private int connectionPoolSize = 100;

    SingleServerConfig() {
    }

    SingleServerConfig(SingleServerConfig config) {
        super(config);
        setAddress(config.getAddress());
        setConnectionPoolSize(config.getConnectionPoolSize());
        setSubscriptionConnectionPoolSize(config.getSubscriptionConnectionPoolSize());
    }

     // 省略 set get
}

2. 负载均衡

这里说的负载均衡是指用于 Redis 集群或者主从模式下的多个 Redis 节点之间进行请求的负载均衡。目前实现的算法依旧为 2 个:

  • 随机选择节点。
  • 轮训选择节点策略。

负载均衡的实现跟 Redisson V1 版本基本保持一致。 不过很奇怪的是将一堆不属于负载均衡的方法写到这个接口中。

2.1 负载均衡接口 LoadBalancer

所有负载均衡策略实现的接口,这里比 V1 版本多了很多个方法定义。

package org.redisson.connection;

import java.util.Collection;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;

public interface LoadBalancer {
    // 获取当前可用的客户端数量
    int getAvailableClients();

    // 异步关闭
    void shutdownAsync();

    // 同步关闭
    void shutdown();

    // 解除指定IP和端口实例的冻结状态
    void unfreeze(String host, int port);

    // 冻结指定IP和端口实例的连接。 返回相关的Pub/Sub连接集合
    Collection<RedisPubSubConnection> freeze(String host, int port);

    // 初始化负载均衡器
    void init(MasterSlaveServersConfig config, ConnectionManager connectionManager);

    //  添加一个订阅连接
    void add(SubscribesConnectionEntry entry);

    // 获取下一个可用的 Redis 连接
    RedisConnection nextConnection();

    // 获取下一个可用的Pub/Sub连接
    RedisPubSubConnection nextPubSubConnection();

    // 归还一个 Redis 连接到连接池
    void returnConnection(RedisConnection connection);

    // 归还一个Pub/Sub连接到连接池
    void returnSubscribeConnection(RedisPubSubConnection connection);
}

2.2 负载均衡实现基类 BaseLoadBalancer

这个类是负责管理 Redis 连接负载均衡的抽象类,大部分负载均衡的实现都可以实现这个抽象类来达到简化实现的目的。

需要注意的是源码中的 freeze ,就是将某个 Redis 节点标记为不可用的状态。 冻结状态下的节点不会被 Redis 负载均衡选择到。

package org.redisson.connection;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.ReclosableLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class BaseLoadBalancer implements LoadBalancer {

    private final Logger log = LoggerFactory.getLogger(getClass());

    private MasterSlaveServersConfig config;  // 主从服务配置

    private ConnectionManager connectionManager; // 连接管理
    
     // 客户端节点全部为空状态锁。 当所有节点都被冻结的时候,会阻塞住方法的调用。
    private final ReclosableLatch clientsEmpty = new ReclosableLatch();
    
    final Queue<SubscribesConnectionEntry> clients = new ConcurrentLinkedQueue<SubscribesConnectionEntry>();  // 客户端订阅的连接

    public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager) {
        this.config = config;
        this.connectionManager = connectionManager;
    }

    // 添加订阅连接
    public synchronized void add(SubscribesConnectionEntry entry) {
        clients.add(entry);
        if (!entry.isFreezed()) {
            clientsEmpty.open();
        }
    }

    // 获取可用的客户端数量
    public int getAvailableClients() {
        int count = 0;
        for (SubscribesConnectionEntry connectionEntry : clients) {
            // 没有冻结的就是可用
            if (!connectionEntry.isFreezed()) {
                count++;
            }
        }
        return count;
    }

    public synchronized void unfreeze(String host, int port) {
        InetSocketAddress addr = new InetSocketAddress(host, port);
        for (SubscribesConnectionEntry connectionEntry : clients) {
            if (!connectionEntry.getClient().getAddr().equals(addr)) {
                continue;
            }
            // 设置冻结状态
            connectionEntry.setFreezed(false);
            clientsEmpty.open();
            return;
        }
        throw new IllegalStateException("Can't find " + addr + " in slaves!");
    }

    // 冻结指定节点,返回发布订阅连接集合
    public synchronized Collection<RedisPubSubConnection> freeze(String host, int port) {
        InetSocketAddress addr = new InetSocketAddress(host, port);
        for (SubscribesConnectionEntry connectionEntry : clients) {
            // 已经冻结过就不处理
            if (connectionEntry.isFreezed()
                    || !connectionEntry.getClient().getAddr().equals(addr)) {
                continue;
            }

            log.debug("{} freezed", addr);
            connectionEntry.setFreezed(true);

            // 关闭这个节点上所有连接
            while (true) {
                RedisConnection connection = connectionEntry.getConnections().poll();
                if (connection == null) {
                    break;
                }
                connection.closeAsync();
            }

            // 关闭所有发布订阅连接
            while (true) {
                RedisPubSubConnection connection = connectionEntry.pollFreeSubscribeConnection();
                if (connection == null) {
                    break;
                }
                connection.closeAsync();
            }

            // 查看是否全部都冻结了
            boolean allFreezed = true;
            for (SubscribesConnectionEntry entry : clients) {
                if (!entry.isFreezed()) {
                    allFreezed = false;
                    break;
                }
            }
            // 全部冻结了,关闭闭锁
            if (allFreezed) {
                clientsEmpty.close();
            }

            List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
            connectionEntry.getAllSubscribeConnections().clear();
            return list;
        }

        return Collections.emptyList();
    }

    // 获取下一个可用的Pub/Sub连接
    public RedisPubSubConnection nextPubSubConnection() {
        clientsEmpty.awaitUninterruptibly();
        List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients);
        while (true) {
            if (clientsCopy.isEmpty()) {
                throw new RedisConnectionException("Slave subscribe-connection pool gets exhausted!");
            }

            int index = getIndex(clientsCopy);
            SubscribesConnectionEntry entry = clientsCopy.get(index);

            if (entry.isFreezed()
                    || !entry.getSubscribeConnectionsSemaphore().tryAcquire()) {
                clientsCopy.remove(index);
            } else {
                try {
                    RedisPubSubConnection conn = entry.pollFreeSubscribeConnection();
                    if (conn != null) {
                        return conn;
                    }
                    conn = entry.getClient().connectPubSub();
                    if (config.getPassword() != null) {
                        conn.sync(RedisCommands.AUTH, config.getPassword());
                    }
                    if (config.getDatabase() != 0) {
                        conn.sync(RedisCommands.SELECT, config.getDatabase());
                    }
                    if (config.getClientName() != null) {
                        conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
                    }

                    entry.registerSubscribeConnection(conn);
                    return conn;
                } catch (RedisConnectionException e) {
                    entry.getSubscribeConnectionsSemaphore().release();
                    // TODO connection scoring
                    log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
                    clientsCopy.remove(index);
                }
            }
        }
    }

    // 获取下一个可用的普通Redis连接
    public RedisConnection nextConnection() {
        clientsEmpty.awaitUninterruptibly();
        List<SubscribesConnectionEntry> clientsCopy = new ArrayList<SubscribesConnectionEntry>(clients);
        while (true) {
            if (clientsCopy.isEmpty()) {
                throw new RedisConnectionException("Slave connection pool gets exhausted!");
            }

            int index = getIndex(clientsCopy);
            SubscribesConnectionEntry entry = clientsCopy.get(index);

            if (entry.isFreezed()
                    || !entry.getConnectionsSemaphore().tryAcquire()) {
                clientsCopy.remove(index);
            } else {
                RedisConnection conn = entry.getConnections().poll();
                if (conn != null) {
                    return conn;
                }
                try {
                    return entry.connect(config);
                } catch (RedisException e) {
                    entry.getConnectionsSemaphore().release();
                    // TODO connection scoring
                    log.warn("Can't connect to {}, trying next connection!", entry.getClient().getAddr());
                    clientsCopy.remove(index);
                }
            }
        }
    }

    // 获取下一个连接的索引
    abstract int getIndex(List<SubscribesConnectionEntry> clientsCopy);

    // 归还一个Pub/Sub连接到连接池
    public void returnSubscribeConnection(RedisPubSubConnection connection) {
        for (SubscribesConnectionEntry entry : clients) {
            if (entry.getClient().equals(connection.getRedisClient())) {
                if (entry.isFreezed()) {
                    connection.closeAsync();
                } else {
                    entry.offerFreeSubscribeConnection(connection);
                }
                entry.getSubscribeConnectionsSemaphore().release();
                break;
            }
        }
    }

    // 归还一个普通Redis连接到连接池
    public void returnConnection(RedisConnection connection) {
        for (SubscribesConnectionEntry entry : clients) {
            if (entry.getClient().equals(connection.getRedisClient())) {
                if (entry.isFreezed()) {
                    connection.closeAsync();
                } else {
                    entry.getConnections().add(connection);
                }
                entry.getConnectionsSemaphore().release();
                break;
            }
        }
    }

    // 同步关闭所有连接
    public void shutdown() {
        for (SubscribesConnectionEntry entry : clients) {
            entry.getClient().shutdown();
        }
    }

    // 异步关闭所有连接
    public void shutdownAsync() {
        for (SubscribesConnectionEntry entry : clients) {
            connectionManager.shutdownAsync(entry.getClient());
        }
    }

}

2.3 随机策略 RandomLoadBalancer

随机策略就是每次请求随机分配给一个节点。

package org.redisson.connection;

import java.util.List;
import java.util.Random;

public class RandomLoadBalancer extends BaseLoadBalancer {

    private final Random random = new Random();

    int getIndex(List<SubscribesConnectionEntry> clientsCopy) {
        // 随机从实例下标中取一个下标
        return random.nextInt(clientsCopy.size());
    }

}

2.4 轮训策略 RoundRobinLoadBalancer

轮训策略就是按顺序,将请求分配给集群中的每一个节点。每个 Redis 节点依次接收请求,直到所有节点都接收过请求后,再从第一个节点重新开始。

package org.redisson.connection;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class RoundRobinLoadBalancer extends BaseLoadBalancer {

    private final AtomicInteger index = new AtomicInteger(-1);  // 自增的下标数值

    @Override
    int getIndex(List<SubscribesConnectionEntry> clientsCopy) {
        // 每次调用时,通过index.incrementAndGet()对索引进行递增操作
        // 对自增值进行取模,确保下标始终在有效范围内
        return Math.abs(index.incrementAndGet() % clientsCopy.size());
    }

}

这种策略比较适用于所有 Reids 节点的性能能力相近的情况。